之前我们分析过SharedIndexInformer,SharedIndexInformer提供了一种单reflector多处理函数的informer机制。本次分析将来介绍kube-controller中的sharedInformerFactory。
我们知道,如果一个informer使用一个reflector,那么过多的listwatcher会导致系统负载过重。对同一类资源进行操作的多个informer可以
共享一个reflector,这样将会节省很多资源。而SharedIndexInformer就提供了这样一种机制,允许用户自己在SharedIndexInformer中添加处
理函数。sharedInformerFactory就是利用SharedIndexInformer的这种特性:在sharedInformerFactory中缓存着多个SharedIndexInformer,
如pod资源对应的sharedIndexInformer,现在只要往该sharedIndexInformer中添加处理函数,即可完成多个informer所要求的处理。
sharedInformerFactory定义在/pkg/controller/infoermers/factory.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| type sharedInformerFactory struct { client clientset.Interface lock sync.Mutex defaultResync time.Duration informers map[reflect.Type]cache.SharedIndexInformer startedInformers map[reflect.Type]bool } func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Duration) SharedInformerFactory { return &sharedInformerFactory{ client: client, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), } }
|
SharedInformerFactory可以生成某资源的sharedInformerFactory,所以在sharedInformerFactory中最重要的字段就是informers,类型为map[reflect.Type]cache.SharedIndexInformer,informers中存储了资源类型和对应sharedIndexInformer的映射关系。
当sharedInformerFactory已经完成初始化,即可以通过Start()方法完成sharedInformerFactory中每个informer的启动:
1 2 3 4 5 6 7 8 9 10 11 12
| func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
|
那么sharedInformerFactory是如何完成众多的informer的初始化呢。以pod为例,来看sharedInformerFactory的Pods()方法:
1 2 3 4
| func (f *sharedInformerFactory) Pods() PodInformer { return &podInformer{sharedInformerFactory: f} }
|
Pods()返回了一个podInformer。
所以,现在,我们可以通过sharedInformerFactory.Pods()
获得一个podInformer。
我们以pod为例,看看是如何生成pod的podInformer的。
podInformer定义在/pkg/controller/informer/core.go中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| type podInformer struct { *sharedInformerFactory } func (f *podInformer) Informer() cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(&api.Pod{}) informer, exists := f.informers[informerType] if exists { return informer } informer = NewPodInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer }
|
这下明了了,podInformer本质就是一个sharedInformerFactory,其实现了Informer()方法,该方法可以看在sharedInformerFactory是否缓存有pod对应的SharedIndexInformer;如果没有,则调用NewPodInformer()生成一个,并加入到sharedInformerFactory的informers map中。
NewPodInformer()定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { sharedIndexInformer := cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return client.Core().Pods(api.NamespaceAll).List(options) }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return client.Core().Pods(api.NamespaceAll).Watch(options) }, }, &api.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) return sharedIndexInformer }
|
NewPodInformer会生成一个以Pod为操作对象的SharedIndexInformer,并返回。
所以现在我们可以通过sharedInformerFactory.Pods().Informer()
获取pod对应的SharedIndexInformer了。
获取到SharedIndexInformer后,就可以通过AddEventHandler()添加处理方法了。具体过程在sharedIndexInformer中已经分析过。
当然在一切准备好之后,一定要运行sharedInformerFactory的Start()方法启动所有的SharedIndexInformer。